package com.opencee.cloud.msg.service.impl;

import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.opencee.cloud.msg.api.constatns.MsgTaskStatus;
import com.opencee.cloud.msg.api.constatns.MsgTaskType;
import com.opencee.cloud.msg.api.entity.MsgTaskEntity;
import com.opencee.cloud.msg.api.vo.DelayedMessage;
import com.opencee.cloud.msg.mapper.MsgTaskMapper;
import com.opencee.cloud.msg.service.MsgTaskService;
import com.opencee.cloud.msg.utils.MqUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static cn.hutool.core.date.DatePattern.NORM_DATETIME_FORMAT;

/**
 * 消息发送任务 服务实现类
 *
 * @author liuyadu
 * @date 2022-02-09
 */
@Slf4j
@Service
public class MsgTaskServiceImpl extends SupperServiceImpl<MsgTaskMapper, MsgTaskEntity> implements MsgTaskService {
    @Autowired
    private AmqpTemplate amqpTemplate;

    /**
     * 添加到延迟信息队列
     *
     * @param entity
     * @return
     */
    @Override
    public boolean addDelayedQueue(MsgTaskEntity entity) throws Exception {
        if (entity == null) {
            return false;
        }
        if (!entity.getStatus().equals(MsgTaskStatus.CREATED.getValue())) {
            return false;
        }
        if (entity.getDelayedTime() == null) {
            return false;
        }
        MsgTaskType taskType = MsgTaskType.getByValue(entity.getType());
        log.debug("任务id: {} 队列路由: {} 队列名: {} 延迟时间:{}", entity.getId(), taskType.getDelayedQueueRK(), taskType.getDelayedQueue(), DateUtil.format(entity.getDelayedTime(), NORM_DATETIME_FORMAT));
        // 小于当前时间,不加入队列
        long times = entity.getDelayedTime().getTime() - System.currentTimeMillis();
        if (times <= 0) {
            log.debug("任务id:{} 小于当前时间,不加入消息队列", entity.getId());
            return false;
        }

        // 当天的定时发送加入延迟队列，非当天的通过定时任务每天凌晨加入到消息队列
        if (times > 0 && DateUtil.isSameDay(new Date(), entity.getDelayedTime())) {
            // 修改状态为:等待中
            entity.setStatus(MsgTaskStatus.WAITING.getValue());
            DelayedMessage delayedMessage = new DelayedMessage("", taskType.getDelayedQueueRK(), null, times, entity);
            MqUtil.delayed(amqpTemplate, delayedMessage);
            this.updateById(entity);
            log.debug("任务id:{} 当天的定时任务,加入消息队列", entity.getId());
        } else {
            log.debug("任务id:{} 非当天的定时任务,不加入消息队列", entity.getId());
        }
        return true;
    }

    /**
     * 取消延时发送
     *
     * @param taskId
     * @return
     */
    @Override
    public boolean cancelDelayed(Long taskId) {
        QueryWrapper<MsgTaskEntity> wrapper = new QueryWrapper();
        wrapper.lambda().eq(MsgTaskEntity::getId, taskId)
                .in(MsgTaskEntity::getStatus, MsgTaskStatus.CREATED.getValue(), MsgTaskStatus.WAITING.getValue());
        MsgTaskEntity record = new MsgTaskEntity();
        record.setStatus(MsgTaskStatus.CANCELED.getValue());
        record.setCancelTime(new Date());
        boolean success = update(record, wrapper);
        return success;
    }

    /**
     * 定时任务-提前几天加载延迟记录到延迟队列
     * 防止海量数据同时加入延迟消息队列
     *
     * @param beforeDay 提前几天,默认提前一天
     */
    @Override
    public void scheduledAddDelayedQueue(Integer beforeDay) {
        if (beforeDay == null) {
            beforeDay = 1;
        }
        Map map = new HashMap();
        map.put("before", beforeDay);
        map.put("status", MsgTaskStatus.CREATED.getValue());
        List<MsgTaskEntity> list = baseMapper.selectListByMap(map);
        if (list != null && !list.isEmpty()) {
            list.forEach(t -> {
                try {
                    addDelayedQueue(t);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }
}
